package drds.data_propagate.metadata;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.MigrateMap;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.entry.ClientId;
import drds.data_propagate.entry.position.Position;
import drds.data_propagate.entry.position.PositionRange;
import drds.data_propagate.metadata.exception.DataPropagateMetaManagerException;
import lombok.Getter;
import lombok.Setter;

import java.util.List;
import java.util.Map;

/**
 * 内存版实现
 */
public class MemoryMetaDataManager extends AbstractLifeCycle implements MetaDataManager {
    @Setter
    @Getter
    protected Map<String, List<ClientId>> taskIdToClientIdListMap;
    @Setter
    @Getter
    protected Map<ClientId, BatchIdToPositionRangeMap> clientIdToBatchIdToPositionRangeMapMap;
    @Setter
    @Getter
    protected Map<ClientId, Position> clientIdToIdMap;

    public void start() {
        super.start();

        clientIdToBatchIdToPositionRangeMapMap = MigrateMap.makeComputingMap(new Function<ClientId, BatchIdToPositionRangeMap>() {

            public BatchIdToPositionRangeMap apply(ClientId clientId) {
                return BatchIdToPositionRangeMap.create(clientId);
            }

        });

        clientIdToIdMap = new MapMaker().makeMap();

        taskIdToClientIdListMap = MigrateMap.makeComputingMap(new Function<String, List<ClientId>>() {

            public List<ClientId> apply(String taskId) {
                return Lists.newArrayList();
            }
        });
    }

    public void stop() {
        super.stop();

        taskIdToClientIdListMap.clear();
        clientIdToIdMap.clear();
        for (BatchIdToPositionRangeMap batch : clientIdToBatchIdToPositionRangeMapMap.values()) {
            batch.clearPositionRanges();
        }
    }

    public synchronized void subscribe(ClientId clientId) throws DataPropagateMetaManagerException {
        List<ClientId> clientIdList = taskIdToClientIdListMap.get(clientId.getTaskId());

        if (clientIdList.contains(clientId)) {
            clientIdList.remove(clientId);
        }

        clientIdList.add(clientId);
    }

    public synchronized boolean hasSubscribe(ClientId clientId) throws DataPropagateMetaManagerException {
        List<ClientId> clientIds = taskIdToClientIdListMap.get(clientId.getTaskId());
        return clientIds != null && clientIds.contains(clientId);
    }

    public synchronized void unsubscribe(ClientId clientId) throws DataPropagateMetaManagerException {
        List<ClientId> clientIds = taskIdToClientIdListMap.get(clientId.getTaskId());
        if (clientIds != null && clientIds.contains(clientId)) {
            clientIds.remove(clientId);
        }
    }

    public synchronized List<ClientId> listAllSubscribeInfo(String taskId)
            throws DataPropagateMetaManagerException {
        // fixed issue #657, fixed ConcurrentModificationException
        return Lists.newArrayList(taskIdToClientIdListMap.get(taskId));
    }

    public Position getPosition(ClientId clientId) throws DataPropagateMetaManagerException {
        return clientIdToIdMap.get(clientId);
    }

    public void updatePosition(ClientId clientId, Position position) throws DataPropagateMetaManagerException {
        clientIdToIdMap.put(clientId, position);
    }

    public Long addPositionRange(ClientId clientId, PositionRange positionRange) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).addPositionRange(positionRange);
    }

    public void addPositionRange(ClientId clientId, PositionRange positionRange, Long batchId)
            throws DataPropagateMetaManagerException {
        clientIdToBatchIdToPositionRangeMapMap.get(clientId).addPositionRange(positionRange, batchId);// 添加记录到指定batchId
    }

    public PositionRange removePositionRange(ClientId clientId, Long batchId) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).removePositionRange(batchId);
    }

    public PositionRange getPositionRange(ClientId clientId, Long batchId) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).getPositionRange(batchId);
    }

    public PositionRange getLastestPositionRange(ClientId clientId) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).getLastestPositionRange();
    }

    public PositionRange getFirstPositionRange(ClientId clientId) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).getFirstPositionRange();
    }

    public Map<Long, PositionRange> listAllBatchs(ClientId clientId) throws DataPropagateMetaManagerException {
        return clientIdToBatchIdToPositionRangeMapMap.get(clientId).listAllPositionRange();
    }

    public void clearAllBatchs(ClientId clientId) throws DataPropagateMetaManagerException {
        clientIdToBatchIdToPositionRangeMapMap.get(clientId).clearPositionRanges();
    }

    // ============================

}
